Object Streaming Tools
Helper functions to simplify creating and concatenating object streams in NodeJS
Motivation
Writing NodeJs streams can be quite challenging. While I myself have create
snippets in my IDE to make my life easier when creating them, frequently I
found myself writing the same code over and over again. Since I am lazy,
I don't really like working this way. And frankly, the German in me simply
wanted - nay demanded! - more efficient, DRYer code.
Looking for a solution early 2016, I first explored RxJs3 . While I
very much appreciated the beauty of that project's approach, it seemed overkill
for what I needed. And when I noticed the significant differences between
version 3 and the then up-and-coming version 4, I decided to take another path.
I also looked at Highland.js, which is very similar in its approach to our goals
here, but was not quit there yet, when we started this.
For the enterprise level application I was designing for and working on with a
team at Copperleaf Technologies, I cooked up my first few helper tools that we
then continued to develop as a team throughout the year.
At version 1.0, this library was at the state we shipped it with that application
in May 2017. Copperleaf Technologies has graciously allowed me to take ownership of the project,
so here it is.
I hope some of you will find it useful.
Basics
start with anything
const just = require( 'object-streaming-tools/lib/just' );
just( 'Hello World!' )
.on( 'data', console.log );
iterate
const fromArray = require( 'object-streaming-tools/lib/fromArray' );
fromArray( [ 1, 2, 3 ] )
.on( 'data', console.log );
iterate with the spread (...) operator
const just = require( 'object-streaming-tools/lib/just' );
just( ...[ 1, 2, 3 ] )
.on( 'data', console.log );
streamify non-streams
when using async functions
const just = require( 'object-streaming-tools/lib/just' );
const apply = require( 'object-streaming-tools/lib/apply' );
function log( s, next ){
console.log( s );
this.emit( 'bar', 'THIS IS SPARTA!' );
setImmediate( next, null, s );
}
just( 'foo' )
.pipe( apply( log ) )
.on( 'bar', console.log )
.resume();
when using synchronous functions
Note: Just demonstrating here a technique to achieve this using
asyncify
from the async library, which is also used
internally.
const range = require( 'object-streaming-tools/lib/range' );
const apply = require( 'object-streaming-tools/lib/apply' );
const asyncify = require( 'async/asyncify' );
range( 1, 3 )
.pipe( apply( asyncify( console.log ) ) )
.resume();
filter
const just = require( 'object-streaming-tools/lib/just' );
const filter = require( 'object-streaming-tools/lib/filter' );
const asyncify = require( 'async/asyncify' );
just( ...[ 0, 1, 2, 3 ] )
.pipe( filter( asyncify( ( x )=>x >= 2 ) ) )
.on( 'data', console.log );
just( ...[ 0, 1, 2, 3 ] )
.pipe( filter( asyncify( ( x )=>x >= 2 ) ) )
.on( filter.RejectedEventKey, console.log )
.resume();
get a range of numbers
const range = require( 'object-streaming-tools/lib/range' );
range( 1, 3 )
.on( 'data', console.log );
iterate over object properties
const just = require( 'object-streaming-tools/lib/just' );
const forIn = require( 'object-streaming-tools/lib/forIn' );
just( { foo: 'bar' } )
.pipe( forIn() )
.on( 'data', ( { key, value } )=>console.log( key, value ) );
group items in a list by a key's values
by using the key's identity
const just = require( 'object-streaming-tools/lib/just' );
const apply = require( 'object-streaming-tools/lib/apply' );
const asyncify = require( 'async/asyncify' );
const keyBy = require( 'object-streaming-tools/lib/keyBy' );
just( ...[ { foo: 'bar' }, { foo: 'baz' } ] )
.pipe( keyBy( 'foo' ) )
.pipe( apply( asyncify( JSON.stringify ) ) )
.pipe( apply( asyncify( console.log ) ) )
.resume();
by using a function to calculate the key
const just = require( 'object-streaming-tools/lib/just' );
const keyBy = require( 'object-streaming-tools/lib/keyBy' );
just( ...[ { foo: 'bar' }, { foo: 'baz' } ] )
.pipe( keyBy( ( { foo } )=>foo ) )
.on( 'data' , ( result )=>console.log( JSON.stringify( result ) ) );
Note: Both approaches above demonstrate various techniques to achieve the same
result. Neither technique is meant to be prescriptive.
emit values of an object
const just = require( 'object-streaming-tools/lib/just' );
const values = require( 'object-streaming-tools/lib/forIn' );
just( { foo: 'bar' } )
.pipe( values() )
.on( 'data', console.log );
switch things up
const range = require( 'object-streaming-tools/lib/range' );
const switchBy = require( 'object-streaming-tools/lib/switchBy' );
const asyncify = require( 'async/asyncify' );
const lookup = [ 'one', 'three', 'five' ];
const forTrue = { ifMatches: true, thenDo: asyncify( x=>lookup[ x ] ) };
const forFalse = { ifMatches: false, thenDo: asyncify( x=>x ) };
range( 1, 5 )
.pipe( switchBy( asyncify( x=>!!x % 2 ), [ forTrue, forFalse ] ) )
.on( 'data', console.log );
start from a callback
const fs = require( 'fs-extra' );
const fromCallback = require( 'object-streaming-tools/lib/fromCallback' );
fromCallback( fs.readJson.bind( null, 'list.json') )
.pipe( flatten() )
.on( 'data', console.log;
emit arrays of a specified length
const items = [1, 2, 3, 4, 5, 6];
just(...items)
.pipe( asLengthLimitedArrays( 4 ) )
.on( 'data', console.log );
creates a slice of the stream starting from start index and up to, but not including, end index
end defaults to Infinity
const items = ['val1', 'val2', 'val3', 'val4', 'val5', 'val6'];
const start = 2;
just(...items)
.pipe( emitRange( start ) )
.on( 'data', console.log );
from start to end
const items = ['val1', 'val2', 'val3', 'val4', 'val5', 'val6'];
const start = 2;
const end = 5;
just(...items)
.pipe( emitRange( start, end ) )
.on( 'data', console.log );
emit only unique items in a stream
unique
const items = [1, 2, 3, 4, 4, 5];
just(...items)
.pipe( unique() )
.on( 'data', console.log );
uniqueBy
via a string iteratee
const items = [{id: 'foo'}, {id: 'bar'}, {id: 'foo'}];
const attributeName = 'id'
just(...items)
.pipe( uniqueBy( attributeName ) )
.on( 'data', console.log );
via an iteratee function
const items = [2.1, 1.2, 2.3];
just(...items)
.pipe( uniqueBy( Math.floor ) )
.on( 'data', console.log );
TBC